[[headers]]
= Message Headers

The 0.11.0.0 client introduced support for headers in messages.
As of version 2.0, Spring for Apache Kafka now supports mapping these headers to and from `spring-messaging` `MessageHeaders`.

NOTE: Previous versions mapped `ConsumerRecord` and `ProducerRecord` to spring-messaging `Message<?>`, where the value property is mapped to and from the `payload` and other properties (`topic`, `partition`, and so on) were mapped to headers.
This is still the case, but additional (arbitrary) headers can now be mapped.

Apache Kafka headers have a simple API, shown in the following interface definition:

[source, java]
----
public interface Header {

    String key();

    byte[] value();

}
----

The `KafkaHeaderMapper` strategy is provided to map header entries between Kafka `Headers` and `MessageHeaders`.
Its interface definition is as follows:

[source, java]
----
public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}
----

The `SimpleKafkaHeaderMapper` maps raw headers as `byte[]`, with configuration options for conversion to `String` values.

The `DefaultKafkaHeaderMapper` maps the key to the `MessageHeaders` header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
A +++"+++`special`+++"+++ header (with a key of `spring_json_header_types`) contains a JSON map of `<key>:<type>`.
This header is used on the inbound side to provide appropriate conversion of each header value to the original type.

On the inbound side, all Kafka `Header` instances are mapped to `MessageHeaders`.
On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that map to `ConsumerRecord` properties.

You can specify which headers are to be mapped for outbound messages, by providing patterns to the mapper.
The following listing shows a number of example mappings:

[source, java]
----
public DefaultKafkaHeaderMapper() { <1>
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { <2>
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { <3>
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { <4>
    ...
}
----

<1> Uses a default Jackson `ObjectMapper` and maps most headers, as discussed before the example.
<2> Uses the provided Jackson `ObjectMapper` and maps most headers, as discussed before the example.
<3> Uses a default Jackson `ObjectMapper` and maps headers according to the provided patterns.
<4> Uses the provided Jackson `ObjectMapper` and maps headers according to the provided patterns.

Patterns are rather simple and can contain a leading wildcard (`+++*+++`), a trailing wildcard, or both (for example, `+++*+++.cat.+++*+++`).
You can negate patterns with a leading `!`.
The first pattern that matches a header name (whether positive or negative) wins.

When you provide your own patterns, we recommend including `!id` and `!timestamp`, since these headers are read-only on the inbound side.

IMPORTANT: By default, the mapper deserializes only classes in `java.lang` and `java.util`.
You can trust other (or all) packages by adding trusted packages with the `addTrustedPackages` method.
If you receive messages from untrusted sources, you may wish to add only those packages you trust.
To trust all packages, you can use `mapper.addTrustedPackages("+++*+++")`.

NOTE: Mapping `String` header values in a raw form is useful when communicating with systems that are not aware of the mapper's JSON format.

Starting with version 2.2.5, you can specify that certain string-valued headers should not be mapped using JSON, but to/from a raw `byte[]`.
The `AbstractKafkaHeaderMapper` has new properties; `mapAllStringsOut` when set to true, all string-valued headers will be converted to `byte[]` using the `charset` property (default `UTF-8`).
In addition, there is a property `rawMappedHeaders`, which is a map of `header name : boolean`; if the map contains a header name, and the header contains a `String` value, it will be mapped as a raw `byte[]` using the charset.
This map is also used to map raw incoming `byte[]` headers to `String` using the charset if, and only if, the boolean in the map value is `true`.
If the boolean is `false`, or the header name is not in the map with a `true` value, the incoming header is simply mapped as the raw unmapped header.

The following test case illustrates this mechanism.

[source, java]
----
@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}
----

Both header mappers map all inbound headers, by default.
Starting with version 2.8.8, the patterns, can also applied to inbound mapping.
To create a mapper for inbound mapping, use one of the static methods on the respective mapper:

[source, java]
----
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
----

For example:

[source, java]
----
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
----

This will exclude all headers beginning with `abc` and include all others.

By default, the `DefaultKafkaHeaderMapper` is used in the `MessagingMessageConverter` and `BatchMessagingMessageConverter`, as long as Jackson is on the classpath.

With the batch converter, the converted headers are available in the `KafkaHeaders.BATCH_CONVERTED_HEADERS` as a `List<Map<String, Object>>` where the map in a position of the list corresponds to the data position in the payload.

If there is no converter (either because Jackson is not present or it is explicitly set to `null`), the headers from the consumer record are provided unconverted in the `KafkaHeaders.NATIVE_HEADERS` header.
This header is a `Headers` object (or a `List<Headers>` in the case of the batch converter), where the position in the list corresponds to the data position in the payload.

IMPORTANT: Certain types are not suitable for JSON serialization, and a simple `toString()` serialization might be preferred for these types.
The `DefaultKafkaHeaderMapper` has a method called `addToStringClasses()` that lets you supply the names of classes that should be treated this way for outbound mapping.
During inbound mapping, they are mapped as `String`.
By default, only `org.springframework.util.MimeType` and `org.springframework.http.MediaType` are mapped this way.

NOTE: Starting with version 2.3, handling of String-valued headers is simplified.
Such headers are no longer JSON encoded, by default (i.e. they do not have enclosing `"+++...+++"` added).
The type is still added to the JSON_TYPES header so the receiving system can convert back to a String (from `byte[]`).
The mapper can handle (decode) headers produced by older versions (it checks for a leading `+++"+++`); in this way an application using 2.3 can consume records from older versions.

IMPORTANT: To be compatible with earlier versions, set `encodeStrings` to `true`, if records produced by a version using 2.3 might be consumed by applications using earlier versions.
When all applications are using 2.3 or higher, you can leave the property at its default value of `false`.

[source, java]
----
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}
----

If using Spring Boot, it will auto configure this converter bean into the auto-configured `KafkaTemplate`; otherwise you should add this converter to the template.

